LSE ST510: Foundations of Machine Learning¶

Week 10: Reinforcement Learning¶

This week's seminar is concerned about Reinforcement Learning. We are going to display standard Q-Learning algorithms as well as Deep Q-Learning using Open AI Gym.

Further we will explore some classical tasks from the book Reinforcement Learning - Barto and Sutton.

In [ ]:
from IPython.display import HTML, display

import warnings

warnings.filterwarnings("ignore")
In [ ]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import rc
import gym

Multi-armed bandit¶

In [ ]:
np.random.seed(10)
bandit_probabilities = [0.10, 0.40, 0.10, 0.10]
number_of_bandits = len(bandit_probabilities)
action_space = np.arange(number_of_bandits)  # =[0, 1, 2, 3]
number_of_trials = 20
timesteps = 1000

arms = np.zeros(timesteps, dtype=int)


def step(action):
    rand = np.random.random()  # [0.0,1.0)
    reward = 1.0 if (rand < bandit_probabilities[action]) else 0.0
    return reward


def cumulative_average_mean(r, n):
    return np.cumsum(r) / np.cumsum(np.ones(n))


def compute_regret(bandit_probabilities, arms, time_steps):
    probs = [bandit_probabilities[arm] for arm in arms]
    optimal_value = np.cumsum(np.ones(time_steps)) * np.amax(bandit_probabilities)
    policy_value = np.cumsum(probs)
    return optimal_value - policy_value
In [ ]:
def epsilon_greedy_policy(epsilon, actions, q_values):
    if np.random.binomial(1, epsilon) == 1:
        return np.random.choice(actions)
    else:
        return np.random.choice(
            [
                action_
                for action_, value_ in enumerate(q_values)
                if value_ == np.max(q_values)
            ]
        )


def apply_epsilon_greedy(n_bandits, action_space, n_trials, timesteps, epsilons, arms):
    rewards = np.zeros((len(epsilons), n_trials, timesteps), dtype=float)
    regrets = np.zeros((len(epsilons), n_trials, timesteps), dtype=float)

    for eps_counter, eps in enumerate(epsilons):
        for trial in range(n_trials):
            n = np.zeros(n_bandits, dtype=int)
            q = np.zeros(n_bandits, dtype=float)
            for t in range(timesteps):

                """
                This is where the magic happens:
                1. Select an action using the current guess and an eps-policy
                2. Execute that action
                3. Update your beliefs

                Extra: Optimism and Upper Confidence bounds
                """
                action = epsilon_greedy_policy(eps, action_space, q)

                r = step(action)

                # updating action counter and Q
                q[action] = q[action] + (r - q[action]) / (
                    n[action] + 1.0
                )  # update value in a online manner
                n[action] += 1

                rewards[eps_counter, trial, t] += r
                arms[t] = action

            regret = compute_regret(bandit_probabilities, arms, timesteps)
            regrets[eps_counter, trial, :] += regret

    rewards = np.mean(rewards, axis=1)
    regrets = np.mean(regrets, axis=1)

    rewards = list(rewards)
    regrets = list(regrets)

    legend_entries = [
        "$\epsilon$-greedy $\epsilon=" + str(eps) + "$" for eps in epsilons
    ]

    rewards = list(zip(rewards, legend_entries))
    regrets = list(zip(regrets, legend_entries))

    return rewards, regrets
In [ ]:
epsilons = [0.0, 0.01, 0.1, 0.9]
rewards_epsilon_greedy, regrets_epsilon_greedy = apply_epsilon_greedy(
    number_of_bandits, action_space, number_of_trials, timesteps, epsilons, arms
)
In [ ]:
def plot_rewards(reward_list, n):
    for r in reward_list:
        plt.plot(cumulative_average_mean(r[0], n), linewidth=3)
    plt.xlabel("Steps")
    plt.ylabel("Average reward")
    legend_str = [r[1] for r in reward_list]
    plt.legend(legend_str)
    plt.show()

def plot_regrets(regrets):
    for a in regrets:
        plt.plot(a[0], linewidth=3)
    plt.xlabel("Steps")
    plt.ylim(0.0, 50.0)
    plt.ylabel("Regret")
    legend_str = [r[1] for r in regrets]
    plt.legend(legend_str)
    plt.show()

plot_rewards(rewards_epsilon_greedy, timesteps)
plot_regrets(regrets_epsilon_greedy)

Monte Carlo methods¶

Monte Carlo prediction¶

In Monte Carlo methods, the value function of a state is estimated by the average of the returns following each visit or each first visit to that state in a set of episodes.

Incremental updates¶

Incremental mean: $$\mu_k=\frac{1}{k}\sum_{j=1}^kx_j=\mu_{k-1}+\frac{1}{k}(x_k-\mu_{k-1})$$

Incremental MC updates: $$v^j(s_t)=v^{j-1}(s_t)+\frac{1}{N(s_t)}(G^j_t-v^{j-1}(s_t))$$

where $G^j_t$ is the total return at $j$-th episode and $N(s_t)$ is the number of times state $s_t$ was visited - the every-visit MC method.

In [ ]:
def mc_policy_evaluation(state_count, r, value):
    return value + (r - value) / state_count

Application: Blackjack¶

Blackjack is a card game where the goal is to beat the dealer by obtaining cards that sum to closer to 21 (without going over 21) than the dealers cards.

In [ ]:
np.random.seed(123)
env = gym.make("Blackjack-v1", natural=False, sab=False)

The observation consists of a 3-tuple containing:

  • the player’s current sum,
  • the value of the dealer’s one showing card (1-10 where 1 is ace),
  • and whether the player holds a usable ace (0 or 1).

Actions:

  • stick (0),
  • and hit (1)

Rewards:

  • win game: +1
  • lose game: -1
  • draw game: 0

We would like to evaluate the following policy:

In [ ]:
def policy(hand_sum):
    if hand_sum > 18:
        return 0
    else:
        return 1
In [ ]:
values_usable_ace = np.zeros((10, 10))
values_no_usable_ace = np.zeros_like(values_usable_ace)
state_count_ace = np.zeros_like(values_usable_ace)
state_count_no_ace = np.zeros_like(state_count_ace)
episodes = 100000

for e in range(episodes):
    done = False
    obs = env.reset(seed=e)
    state_history = []
    g = []

    obs = obs[0]
    if obs[0] < 11:
        done = True

    state_history.append(obs)
    """
    We first play a game from beginning to end, following the policy that we need to evaluate
    """
    while not done:
        a = policy(obs[0])
        obs, r, done, info, prob = env.step(a)
        g.append(r)
        if done:
            break
        state_history.append(obs)

    final_reward = sum(g)

    """
    We then combine the information of a whole episode together,
    to get an estimate of the value of being in a certain state
    and following the given policy.
    
    Remark: this is an UNBIASED estimator
    """
    for player_idx, dealer_idx, ace in state_history:
        player_idx -= 12
        dealer_idx -= 1

        if ace:
            state_count_ace[player_idx, dealer_idx] += 1.0
            values_usable_ace[player_idx, dealer_idx] = mc_policy_evaluation(
                state_count_ace[player_idx, dealer_idx],
                final_reward,
                values_usable_ace[player_idx, dealer_idx],
            )
        else:
            state_count_no_ace[player_idx, dealer_idx] += 1.0
            values_no_usable_ace[player_idx, dealer_idx] = mc_policy_evaluation(
                state_count_no_ace[player_idx, dealer_idx],
                final_reward,
                values_no_usable_ace[player_idx, dealer_idx],
            )
In [ ]:
from mpl_toolkits.mplot3d import Axes3D


def plot_v(values, ace=True):
    """
    Visualize the value function
    """
    fig = plt.figure(figsize=(12, 8))
    ax = fig.add_subplot(projection="3d")
    ax.view_init(elev=15, azim=0, roll=0)

    x = np.arange(12, 22)
    y = np.arange(1, 11)

    X, Y = np.meshgrid(y, x)

    Z = values.reshape(X.shape)

    ax.plot_surface(X, Y, Z, cmap=plt.cm.coolwarm, linewidth=1, rstride=1, cstride=1)
    if ace:
        ax.set_title("With Ace")
    else:
        ax.set_title("Without Ace")
    ax.set_xlabel("Dealer Showing")
    ax.set_ylabel("Player Hand")
    ax.set_zlabel("State Value")
    plt.show()
In [ ]:
plot_v(values_usable_ace)
plot_v(values_no_usable_ace, ace=False)

Generalization and Function Approximation¶

Next, we will practice solving reinforcement problems by using function approximation. Specifically, we will consider the following exercise:

  • The Mountain Car example: linear function approximator
In [ ]:
import sklearn.pipeline
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import LinearRegression

Before starting looking into our exercises, we provie a summary of some basic concepts.

A quick summary of some basic concepts¶

On-policy vs off-policy methods¶

On-policy methods aim to estimate the value of the policy that is used for control. Examples of on-policy methods are:

  • On-policy MC control
  • On-policy TD control: SARSA

The updates of action estimates according to SARSA are of the following form:

$$ Q(s_t,a_t){\leftarrow}Q(s_t,a_t)+\alpha[r_{t+1}+{\gamma}Q(s_{t+1},a_{t+1})-Q(s_t,a_t)] $$

An on-policy method updates action value estimates using the values of next state-action pairs where the next actions are generated by the current policy.

In off-policy methods, the behaviour policy that generates actions may be unrelated to the action that is being used in the target. Examples of off-policy methods include:

  • Off-policy TD control: Q-learning

The updates of action estimates according to Q-learning are of the following form:

$$ Q(s_t,a_t){\leftarrow}Q(s_t,a_t)+\alpha[r_{t+1}+{\gamma}\max_{a}Q(s_{t+1},a)-Q(s_t,a_t)] $$

Q-learning updates action value estimates using the values of next state-action pairs where the next actions are generated by a greedy policy. That is to say, the action-state values are estimated under the assumption of a greedy policy despite the fact that it is not following a greedy policy.

The Mountain Car Problem¶

The Mountain Car problem is described in Example 10.1 (2nd Edition) in Sutton & Barto. The basic idea is driving the car to reach the goal position by building up momentum. We will use the existing environment in OpenAI Gym (ID: MountainCar-v0).

  • Actions = [push left (0), no push (1), push right (2)]
  • Reward is -1 on all time steps until the car reaches its goal
  • The state space is continuous
  • The states have two dimensions: state = [position, velocity]
  • Position: [-1.2, 0.6]
  • The goal is to reach the flag position(0.5)
  • Velocity: [-0.7, 0.7]

Normalize data¶

As the RBF kernel assumes all features are centred around 0 and have variance in the same order, we will normalize the data as the first step. For the task of normalization and feature engineering, we will use the scikit-learn library.

As this method falls into the category of online learning, we will define a normalizer and feature extractor using some pre-training data as follows:

We randomly sample 10000 observations as pre-training data for normalization.

In [ ]:
openai_env = "MountainCar-v0"
env = gym.make(openai_env, render_mode="rgb_array")
n_actions = env.action_space.n
actions = np.arange(n_actions)
sample_size = 10000

# Note: Alternatively create samples via env.observation_space.sample()

obs_samples = []
num_samples = 0
while num_samples < sample_size:
    env.reset()
    done = False
    while not done:
        (
            observation,
            _,
            done,
            _,
            _,
        ) = env.step(env.action_space.sample())
        obs_samples.append(observation)
        num_samples += 1
# obs_samples = np.array([env.observation_space.sample() for i in range(sample_size)])
# print(obs_samples)

Control with function approximation¶

We approximate the action-value function by a linear combination of features, i.e.,

$$\hat{Q}(s,a,\mathbf{w})=\phi(s,a)^{\top}\mathbf{w}$$

The objective function is the MSE between the true action value $Q(s,a)$ and $\hat{Q}(s,a,\mathbf{w})$:

$$ \mathcal{L(\mathbf{w})}=\frac{1}{2}\mathbb{E}_{s,a,r,s'}[(Q(s,a)-\hat{Q}(s,a,\mathbf{w}))^2] $$

Differentiating the objective w.r.t. the parameter $\mathbf{w}$, we obtain the parameter update

$$ \mathbf{w}_{t+1}=\mathbf{w}_t+{\eta}(Q(s_t,a_t)-\hat{Q}(s_t,a_t,\mathbf{w}_t))\phi(s_t,a_t) $$

Unfortunately, we don't have access to the true $Q(s,a)$ in practice. We therefore substitute a target for it.

We can substituting with a SARSA target

$$Q(s_t,a_t)=r_{t}+\gamma\hat{Q}(s_{t+1},a_{t+1},\mathbf{w}_t)$$

or a Q-learning target,

$$ Q(s_t,a_t)=r_{t}+\gamma\max_{a'}\hat{Q}(s_{t+1},a',\mathbf{w}_t) $$

In the following implementation, we choose the SARSA target as a substitution to the true state-action values. The update for parameter $\mathbf{w}$ is therefore

$$ \mathbf{w}_{t+1}=\mathbf{w}_t+{\eta}(r_{t}+\gamma\hat{Q}(s_{t+1},a_{t+1},\mathbf{w}_t)-\hat{Q}(s_t,a_t,\mathbf{w}_t))\phi(s_t,a_t) $$
In [ ]:
class Approximator:
    def __init__(self, obs, dim, n_a, alpha, discount):
        self.__discount = discount
        self.__alpha = alpha
        self.__n_actions = n_a
        self.__obs_samples = obs
        self.__feature_dim = dim
        self.__w_size = 4 * self.__feature_dim
        self.__w = np.zeros((self.__n_actions, self.__w_size))
        self.__scaler = None
        self.__featuriser = None
        self.__initialised = False

    def initialise_scaler_featuriser(self):
        self.__scaler = sklearn.preprocessing.StandardScaler().fit(self.__obs_samples)
        self.__featuriser = sklearn.pipeline.FeatureUnion(
            [
                ("rbf1", RBFSampler(gamma=5.0, n_components=self.__feature_dim)),
                ("rbf2", RBFSampler(gamma=2.0, n_components=self.__feature_dim)),
                ("rbf3", RBFSampler(gamma=1.0, n_components=self.__feature_dim)),
                ("rbf4", RBFSampler(gamma=0.5, n_components=self.__feature_dim)),
            ]
        )
        self.__featuriser.fit(self.__scaler.transform(self.__obs_samples))
        self.__initialised = True

    @property
    def get_w(self):
        return self.__w

    def feature_transformation(self, state):
        if not self.__initialised:
            self.initialise_scaler_featuriser()

        scaled = self.__scaler.transform([state])
        features = self.__featuriser.transform(scaled)
        return features

    # linear_features
    def action_value_estimator(self, features, a):
        return np.inner(features, self.__w[a])

    # minimizing MSE between q(replaced by td target) and q_hat
    def update_w(self, r, q, next_q, features, a):
        target = r + self.__discount * next_q
        td_error = target - q
        w_gradient = self.__alpha * td_error * features
        self.__w[a] = self.__w[a] + w_gradient

    def set_w(self, a, new_w):
        self.__w[a] = new_w

    def cost_to_go(self, state):
        features = self.feature_transformation(state)
        v_s = []
        for i in range(self.__n_actions):
            v_s.append(self.action_value_estimator(features, i))
        return -np.max(v_s)
In [ ]:
def plot_v(estimator, i=None):
    if openai_env == "MountainCar-v0":
        fig = plt.figure(figsize=(12, 8))
        ax = fig.add_subplot(projection="3d")

        x = np.linspace(-1.2, 0.6, 30)
        y = np.linspace(-0.07, 0.07, 30)

        X, Y = np.meshgrid(x, y)
        states = np.dstack((X, Y))

        values = np.apply_along_axis(estimator.cost_to_go, 2, states)

        ax.plot_surface(
            X, Y, values, cmap=plt.cm.coolwarm, linewidth=1, rstride=1, cstride=1
        )

        extra_title = ""
        if i:
            extra_title = " after {} iterations".format(i)
        ax.set_title("Cost to go function" + extra_title)
        ax.set_xlabel("Position")
        ax.set_ylabel("Velocity")
        ax.set_zlabel("Value")
        plt.show()

Solve the problem¶

Based on the randomly-sampled 10000 observations, create an instance of the state-action function approximator and initialize the normaliser and feature extractor:

In [ ]:
alpha = 0.1
gamma = 1.0
epsilon = 0.1
dim = 100

# create an instance of approximator and initialize the normalizer and feature extractor
estimator = Approximator(obs_samples, dim, n_actions, alpha, gamma)
estimator.initialise_scaler_featuriser()

Play the game and do online updates for parameter $\mathbf{w}$,

In [ ]:
def epsilon_greedy_policy(epsilon, actions, values):
    if np.random.binomial(1, epsilon) == 1:
        return np.random.choice(actions)
    else:
        return np.random.choice(
            [
                action_
                for action_, value_ in enumerate(values)
                if value_ == np.max(values)
            ]
        )
In [ ]:
episodes = 50
for i in range(1, episodes + 1):
    state = env.reset()[0]
    a = env.action_space.sample()
    step_count = 0
    while True:
        step_count += 1
        """
        At each step, we first perform our action a (choosen in an
        epsilon-greedy way). This is possible because we can interrogate
        our blackbox as many time as we want to get a guess about the 
        value of each action.
        Choosing a was done at the previous step.
        """
        (
            next_state,
            r,
            done,
            _,
            _,
        ) = env.step(a)

        if done:
            break

        """
        We interrogate our blackbox to have a guess about the 
        value of Q(s,a) for our eps-greedy action a.
        
        The feature thing is due to the fact that our state space
        is too big, so we use two neural networks instead of one,
        but one is just used to represent our states.
        """
        features = estimator.feature_transformation(state)
        q_sa = estimator.action_value_estimator(features, a)
        next_feature = estimator.feature_transformation(next_state)
        """
        We need a guess about what the optimal policy would be,
        I.e. we need to be able to have an epsilon-guessing-optimal
        strategy.
        This bit is indeed very important. We essentially ask to
        the black-box for the list of guesses of values of each action.
        This allow us to do something similar to the value iteration.
        """

        q_values = []
        for j in actions:
            q_values.append(estimator.action_value_estimator(next_feature, j))

        """
        We now select the action a greedily with respect to our guesses.
        """
        next_a = epsilon_greedy_policy(epsilon, actions, q_values)
        next_q_sa = estimator.action_value_estimator(next_feature, next_a)

        """
        Here we update the network. This is done using on one side the actual
        return, on the other side our guesses.
        """
        estimator.update_w(r, q_sa, next_q_sa, features, a)

        a = next_a
        state = next_state
    if i % 10 == 0:
        plot_v(estimator, i)

Results¶

In [ ]:
plot_v(estimator)
In [ ]:
import matplotlib.animation
import matplotlib
from IPython.display import HTML

rewards = 0
env.reset()
frames = []
action = env.action_space.sample()
while True:
    (
        s,
        r,
        done,
        _,
        _,
    ) = env.step(action)
    frames.append(env.render())
    rewards = rewards + r
    if done:
        break

    feature = estimator.feature_transformation(s)
    q_values = []
    for j in actions:
        q_values.append(estimator.action_value_estimator(feature, j))

    action = epsilon_greedy_policy(0, actions, q_values)
print(rewards)

plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis("off")
animate = lambda i: patch.set_data(frames[i])
ani = matplotlib.animation.FuncAnimation(
    plt.gcf(), animate, frames=len(frames), interval=50
)
HTML(ani.to_jshtml())
-131.0
Out[ ]:

Actor-Critic methods¶

Actor-Critic methods are policy gradient methods that represent the policy function independent of the value function.

A policy function (or policy) returns a probability distribution over actions that the agent can take based on the given state. A value function determines the expected return for an agent starting at a given state and acting according to a particular policy forever after.

In the Actor-Critic method, the policy is referred to as the actor that proposes a set of possible actions given a state, and the estimated value function is referred to as the critic, which evaluates actions taken by the actor based on the given policy.

In this tutorial, both the Actor and Critic will be represented using neural networks.

CartPole-v1

In the CartPole-v1 environment, a pole is attached to a cart moving along a frictionless track. The pole starts upright and the goal of the agent is to prevent it from falling over by applying a force of -1 or +1 to the cart. A reward of +1 is given for every time step the pole remains upright. An episode ends when (1) the pole is more than 15 degrees from vertical or (2) the cart moves more than 2.4 units from the center.

Trained actor-critic model in Cartpole-v1 environment

The problem is considered "solved" when the average total reward for the episode reaches 495 over 100 consecutive trials.

The code below is from this repository

Setup¶

Import necessary packages and configure global settings.

In [ ]:
import gym, os
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make(
    "CartPole-v1",
).unwrapped

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
lr = 0.0001

Model¶

The Actor and Critic will be modeled using neural networks that generates the action probabilities and critic value respectively.

During the forward pass, the model will take in the state as the input and will output both action probabilities and critic value $V$, which models the state-dependent value function. The goal is to train a model that chooses actions based on a policy $\pi$ that maximizes expected return.

For Cartpole-v1, there are four values representing the state: cart position, cart-velocity, pole angle and pole velocity respectively. The agent can take two actions to push the cart left (0) and right (1) respectively.

In [ ]:
torch.manual_seed(2024510)


class Actor(nn.Module):
    def __init__(self, state_size, action_size):
        super(Actor, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = nn.Linear(self.state_size, 128)
        self.linear2 = nn.Linear(128, 256)
        self.linear3 = nn.Linear(256, self.action_size)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        output = self.linear3(output)
        distribution = Categorical(F.softmax(output, dim=-1))
        return distribution


class Critic(nn.Module):
    def __init__(self, state_size, action_size):
        super(Critic, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = nn.Linear(self.state_size, 128)
        self.linear2 = nn.Linear(128, 256)
        self.linear3 = nn.Linear(256, 1)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        value = self.linear3(output)
        return value

Computing expected returns¶

The sequence of rewards for each timestep $t$, $\{r_{t}\}^{T}_{t=1}$ collected during one episode is converted into a sequence of expected returns $\{G_{t}\}^{T}_{t=1}$ in which the sum of rewards is taken from the current timestep $t$ to $T$ and each reward is multiplied with an exponentially decaying discount factor $\gamma$:

$$G_{t} = \sum^{T}_{t'=t} \gamma^{t'-t}r_{t'}$$

Since $\gamma\in(0,1)$, rewards further out from the current timestep are given less weight.

Intuitively, expected return simply implies that rewards now are better than rewards later. In a mathematical sense, it is to ensure that the sum of the rewards converges.

In [ ]:
def compute_returns(next_value, rewards, masks, gamma=0.99):
    R = next_value
    returns = []
    for step in reversed(range(len(rewards))):
        R = rewards[step] + gamma * R * masks[step]
        returns.insert(0, R)
    return returns

The actor-critic loss¶

Actor loss¶

The actor loss is based on policy gradients with the critic as a state dependent baseline and computed with single-sample (per-episode) estimates.

$$L_{actor} = -\sum^{T}_{t=1} \log\pi_{\theta}(a_{t} | s_{t})[G(s_{t}, a_{t}) - V^{\pi}_{\theta}(s_{t})]$$

where:

  • $T$: the number of timesteps per episode, which can vary per episode
  • $s_{t}$: the state at timestep $t$
  • $a_{t}$: chosen action at timestep $t$ given state $s$
  • $\pi_{\theta}$: is the policy (actor) parameterized by $\theta$
  • $V^{\pi}_{\theta}$: is the value function (critic) also parameterized by $\theta$
  • $G = G_{t}$: the expected return for a given state, action pair at timestep $t$

A negative term is added to the sum since the idea is to maximize the probabilities of actions yielding higher rewards by minimizing the combined loss.


Advantage¶

The $G - V$ term in our $L_{actor}$ formulation is called the advantage, which indicates how much better an action is given a particular state over a random action selected according to the policy $\pi$ for that state.

While it's possible to exclude a baseline, this may result in high variance during training. And the nice thing about choosing the critic $V$ as a baseline is that it trained to be as close as possible to $G$, leading to a lower variance.

In addition, without the critic, the algorithm would try to increase probabilities for actions taken on a particular state based on expected return, which may not make much of a difference if the relative probabilities between actions remain the same.

For instance, suppose that two actions for a given state would yield the same expected return. Without the critic, the algorithm would try to raise the probability of these actions based on the objective $J$. With the critic, it may turn out that there's no advantage ($G - V = 0$) and thus no benefit gained in increasing the actions' probabilities and the algorithm would set the gradients to zero.


Critic loss¶

Training $V$ to be as close possible to $G$ can be set up as a regression problem with the following loss function:

$$L_{critic} = L_{\delta}(G, V^{\pi}_{\theta})$$

where $L_{\delta}$ is the squared-error loss.

Training¶

To train the agent, you will follow these steps:

  1. Run the agent on the environment to collect training data per episode.
  2. Compute expected return at each time step.
  3. Compute the loss for the actor and critic models.
  4. Compute gradients and update network parameters.
  5. Repeat 1-4 until either success criterion or max episodes has been reached.

Note that this method differs algorithmically from the lecture and also from the Sutton-Barto Book. The weights are updated online after each step, while the implementation in this Notebook only updates parameters in an offline manner at the end of an episode.

In [ ]:
def trainIters(actor, critic, n_iters):
    optimizerA = optim.Adam(actor.parameters())
    optimizerC = optim.Adam(critic.parameters())
    for iter in range(n_iters):
        state = env.reset(seed=iter)
        state = state[0]
        log_probs = []
        values = []
        rewards = []
        masks = []

        for i in count():
            state = torch.FloatTensor(state).to(device)
            dist, value = actor(state), critic(state)

            action = dist.sample()
            next_state, reward, done, _, _ = env.step(action.cpu().numpy())

            log_prob = dist.log_prob(action).unsqueeze(0)

            log_probs.append(log_prob)
            values.append(value)
            rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
            masks.append(torch.tensor([1 - done], dtype=torch.float, device=device))

            state = next_state

            if done:
                if iter % 10 == 0:
                    print("Iteration: {}, Score: {}".format(iter, i))
                break

        next_state = torch.FloatTensor(next_state).to(device)
        next_value = critic(next_state)
        returns = compute_returns(next_value, rewards, masks)

        log_probs = torch.cat(log_probs)
        returns = torch.cat(returns).detach()
        values = torch.cat(values)

        advantage = returns - values

        actor_loss = -(log_probs * advantage.detach()).mean()
        critic_loss = advantage.pow(2).mean()

        optimizerA.zero_grad()
        optimizerC.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        optimizerA.step()
        optimizerC.step()
    env.close()
In [ ]:
import warnings

warnings.filterwarnings("ignore")
actor = Actor(state_size, action_size).to(device)
critic = Critic(state_size, action_size).to(device)
trainIters(actor, critic, n_iters=200)
Iteration: 0, Score: 11
Iteration: 10, Score: 19
Iteration: 20, Score: 12
Iteration: 30, Score: 11
Iteration: 40, Score: 24
Iteration: 50, Score: 22
Iteration: 60, Score: 21
Iteration: 70, Score: 10
Iteration: 80, Score: 42
Iteration: 90, Score: 27
Iteration: 100, Score: 27
Iteration: 110, Score: 93
Iteration: 120, Score: 99
Iteration: 130, Score: 145
Iteration: 140, Score: 615
Iteration: 150, Score: 427
Iteration: 160, Score: 293
Iteration: 170, Score: 711
Iteration: 180, Score: 405
Iteration: 190, Score: 578

Integrated Planning, Acting, and Learning¶

The content of this notebook accompanies Chapter 8 in the Sutton & Barto book.